RidgeRun Metadata/Use Cases/UAV metadata streaming

From RidgeRun Developer Wiki

Follow Us On Twitter LinkedIn Email Share this page





NVIDIA partner logo NXP partner logo






Use Case: Surveillance or Reconnaissance Drones

In defense, security, and monitoring applications, drones (UAVs) equipped with cameras are commonly used to capture real-time video. However, the value of this video increases significantly when it is enriched with metadata that provides contextual information such as GPS coordinates, altitude, camera orientation, precise timestamps, and more.

One of the most widely adopted standards for transmitting this type of information is MISB (Motion Imagery Standards Board). MISB defines how to structure and transport metadata relevant to visual intelligence. This metadata can be encapsulated and transmitted alongside the video using the MPEG Transport Stream (MPEG-TS) container, which supports multiplexing of video, audio, and metadata into a single synchronized stream.

Applicable RidgeRun Products

MPEG-TS

By leveraging RidgeRun's MPEG-TS metadata framework, developers can easily integrate synchronized, real-time metadata into video pipelines used in UAV applications. The solution simplifies the injection and extraction process using the metasrc and metasink elements, and supports both asynchronous and synchronous metadata conforming to standards like MISB ST 1402.

This enables a powerful and standards-compliant method of enriching video streams with mission-critical data—ideal for drones engaged in surveillance, reconnaissance, or tactical operations.

Code Example

This script creates a real-time GStreamer pipeline that streams H.264 video and KLV metadata (MISB ST 0601) over RTP/UDP to 127.0.0.1:5000. It generates and updates a KLV packet containing a single tag 9, representing sensor latitude, with values oscillate between 35 and 45 degrees to simulate dynamic sensor data. The packet is serialized and dynamically injected into the stream using the metasrc element via the metadata-binary property.

This setup is ideal for simulating a live video feed with embedded, time-synchronized telemetry, making it suitable for testing systems that process MPEG-TS with KLV metadata. The example also includes a receiver pipeline using udpsrc to capture the RTP stream, extract the KLV metadata allowing real-time verification of the injected values.

This project uses LibMISB, a library developed by RidgeRun for encoding and decoding KLV metadata based on MISB standards. It simplifies the integration of real-time telemetry into video streams by handling the serialization process automatically — particularly useful for standards like MISB ST 0601 (e.g., sensor latitude, tag 9).

MISB Standards: LibMISB


Sender

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys, signal, gi
gi.require_version('RrHelper', '1.0')
gi.require_version('Gst', '1.0')
gi.require_version('GObject', '2.0')
gi.require_version('GLib', '2.0')
from gi.repository import Gst, GLib, RrHelper

# MISB
from libmisb import LibMisb, Metadata, metadata_item, LogLevel
from libmisb.formatter import JsonFormatter

# --- MISB Config ---
ST0601_KEY = "060E2B34020B01010E01030101000000"  # ST 0601 (UAS Local Set)
TAG = "9"                                        # only tag that will change
BASE_VALUE = 40                                  # base
DELTA = 5                                        # ±5
INTERVAL_MS = 500                                # 500 ms
OFFSETS = list(range(-DELTA, DELTA + 1))         # [-5..+5]

def build_items(lst):
    out = []
    for entry in lst:
        mi = metadata_item()
        mi.tag = entry["tag"]
        mi.value = entry["value"]
        mi.sub_items = []
        mi.local_set = True
        out.append(mi)
    return out

def encode_single_tag(lm, key_hex: str, tag: str, value_str: str) -> bytes:
    meta = Metadata()
    meta.set_key(key_hex)
    meta.set_items(build_items([{"tag": tag, "value": value_str}]))
    pkt, status = lm.encode(meta)
    if status != 0:
        raise RuntimeError(f"MISB encode failed: {status}")
    return pkt

# MISB Encoder
lm = LibMisb()
lm.set_formatter(JsonFormatter())
lm.set_log_level(LogLevel.INFO)

# ==========================
# Pipeline with metasrc → mpegtsmux → RTP/UDP
# ==========================
PIPELINE_DESCRIPTION = (
    # Declare the mux first, then feed its pads with video and metadata
    "mpegtsmux name=mux ! rtpmp2tpay pt=33 ! udpsink host=127.0.0.1 port=5000 "
    # Video branch
    "videotestsrc is-live=true pattern=smpte ! "
    "video/x-raw,width=640,height=360,framerate=30/1 ! "
    "x264enc tune=zerolatency key-int-max=30 bitrate=2000 speed-preset=ultrafast ! "
    "h264parse ! mux. "
    # Metadata (KLV) branch
    "metasrc name=meta ! mux."
)

class TsMetaSender:
    def __init__(self):
        self.pipeline = None
        self.meta = None
        self.loop = None
        self.idx = 0

    def create_pipeline(self):
        self.loop = GLib.MainLoop()
        try:
            self.pipeline = Gst.parse_launch(PIPELINE_DESCRIPTION)
        except GLib.Error as e:
            print(f"Unable to build pipeline: {e.message}", file=sys.stderr)
            return False

        self.meta = self.pipeline.get_by_name("meta")
        if not self.meta:
            print("Could not get metasrc element", file=sys.stderr)
            return False

        # Bus
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", self.on_bus_message)

        # Ctrl+C
        signal.signal(signal.SIGINT, lambda *_: self.quit())

        # Update KLV every 500 ms and push it through metasrc (metadata-binary)
        GLib.timeout_add(INTERVAL_MS, self._tick_update_klv)
        return True

    def _tick_update_klv(self):
        try:
            value = BASE_VALUE + OFFSETS[self.idx]
            pkt = encode_single_tag(lm, ST0601_KEY, TAG, f"{value}")
            # metasrc exposes 'metadata-binary' for injection from app
            RrHelper.set_gbytearray_property(self.meta, "metadata-binary", pkt)
            print(f"[tick] tag{TAG}={value:.5f}  KLV={pkt.hex()[:64]}... (len={len(pkt)})")
            self.idx = (self.idx + 1) % len(OFFSETS)
        except Exception as e:
            print(f"Update error: {e}", file=sys.stderr)
        return True

    def on_bus_message(self, _bus, msg):
        if msg.type == Gst.MessageType.EOS:
            print("End of stream"); self.quit()
        elif msg.type == Gst.MessageType.ERROR:
            err, dbg = msg.parse_error()
            print(f"Error: {err} {dbg or ''}", file=sys.stderr)
            self.quit()

    def start(self):
        self.pipeline.set_state(Gst.State.PLAYING)
        print("Running… MPEG-TS + KLV (metasrc) over RTP/UDP :5000")
        self.loop.run()

    def quit(self):
        if self.loop and self.loop.is_running():
            self.loop.quit()
        if self.pipeline:
            self.pipeline.set_state(Gst.State.NULL)

if __name__ == "__main__":
    Gst.init(None)
    app = TsMetaSender()
    if not app.create_pipeline():
        sys.exit(1)
    app.start()

Receiver

gst-launch-1.0 -e -v   udpsrc port=5000     caps="application/x-rtp,media=video,encoding-name=MP2T,clock-rate=90000,payload=33" !   rtpmp2tdepay !   'video/mpegts, systemstream=(boolean)true, packetsize=(int)188' !   tsdemux name=demux   demux. ! queue ! meta/x-klv ! misbparser ! metasink async=false

Output

=========================
key:	6 E 2B 34 2 B 1 1 E 1 3 1 1 0 0 0 
Length:	20
Value:
--------
--------
[0]
Tag:	[2](Timestamp)
Length:	8
Value:
	A9 80 29 94 03 3E 06 00 
--------
--------
[1]
Tag:	[9](Platform Indicated Airspeed)
Length:	1
Value:
	29 
--------
--------
[2]
Tag:	[65](UAS DataLink LS Version Number)
Length:	1
Value:
	13 
--------
--------
[3]
Tag:	[1](Checksum)
Length:	2
Value:
	61 0C 
00000000 (0x71b038009970): 06 0e 2b 34 02 0b 01 01 0e 01 03 01 01 00 00 00  ..+4............
00000010 (0x71b038009980): 14 02 08 a9 80 29 94 03 3e 06 00 09 01 29 41 01  .....)..>....)A.
00000020 (0x71b038009990): 13 01 02 61 0c                                   ...a.           
=========================